package com.jxx.bigdata;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.Test;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class HdfsOperate {

    @Test
    public void mkdirToHdfs() throws IOException {
        Configuration configuration = new Configuration();
        //如果是操作hdfs上面的文件，一定要配置fs.defaultFS
        configuration.set("fs.defaultFS","hdfs://node01:8020");
        FileSystem fileSystem = FileSystem.get(configuration);
        fileSystem.mkdirs(new Path("/jxx/dir1"));
        fileSystem.close();
    }

    @Test
    public void uploadFile() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS","hdfs://node01:8020");
        FileSystem fileSystem = FileSystem.get(configuration);
        fileSystem.copyFromLocalFile(new Path("file:///E:\\test_data\\hello.txt"),new Path("hdfs://node01:8020/jxx/dir1"));
        fileSystem.close();
    }

    @Test
    public void downloadFile() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS","hdfs://node01:8020");
        FileSystem fileSystem = FileSystem.get(configuration);
        fileSystem.copyToLocalFile(new Path("hdfs://node01:8020/jxx/dir1/hello.txt"),new Path("file:///E:\\test_data\\hello2.txt"));
        fileSystem.close();
    }


    @Test
    public void testListFiles() throws IOException, InterruptedException, URISyntaxException {
        // 1获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://node01:8020"), configuration);
        // 2 获取文件详情
        RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
        while(listFiles.hasNext()){
            LocatedFileStatus status = listFiles.next();
            // 输出详情
            // 文件名称
            System.out.println(status.getPath().getName());
            // 长度
            System.out.println(status.getLen());
            // 权限
            System.out.println(status.getPermission());
            // 分组
            System.out.println(status.getGroup());
            // 获取存储的块信息
            BlockLocation[] blockLocations = status.getBlockLocations();

            for (BlockLocation blockLocation : blockLocations) {
                // 获取块存储的主机节点
                String[] hosts = blockLocation.getHosts();
                for (String host : hosts) {
                    System.out.println(host);
                }
            }
        }
        // 3 关闭资源
        fs.close();
    }


    /**
     * 适用于HDFS的自定义操作，其实API的底层也是使用IO流进行操作的
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    @Test
    public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {
        // 1 获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://node01:8020"), configuration);
        // 2 创建输入流
        FileInputStream fis = new FileInputStream(new File("E:\\test_data\\helo.txt"));
        // 3 获取输出流
        FSDataOutputStream fos = fs.create(new Path("hdfs://node01:8020/outresult.txt"));
        // 4 流对拷
        IOUtils.copy(fis, fos);
        // 5 关闭资源
        IOUtils.closeQuietly(fos);
        IOUtils.closeQuietly(fis);
        fs.close();
    }


}
